/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.handler;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.tests.util.LuceneTestCase.Nightly;
import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.GenericSolrRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Nightly
// Backups do checksum validation against a footer value not present in 'SimpleText'
@SuppressCodecs({"SimpleText"})
public class TestStressThreadBackup extends SolrCloudTestCase {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private static final Pattern ENDS_WITH_INT_DIGITS = Pattern.compile("\\d+$");
  private Path backupDir;
  private SolrClient adminClient;
  private SolrClient coreClient;
  private String coreName;

  @BeforeClass
  public static void beforeClass() {
    System.setProperty("solr.security.allow.paths", "*");
  }

  @AfterClass
  public static void afterClass() {
    System.clearProperty("solr.security.allow.paths");
  }

  @Before
  public void beforeTest() throws Exception {
    backupDir = createTempDir(getTestClass().getSimpleName() + "_backups");

    // NOTE: we don't actually care about using SolrCloud, but we want to use SolrClient and I can't
    // bring myself to deal with the nonsense that is SolrJettyTestBase.

    // We do however explicitly want a fresh "cluster" every time a test is run
    configureCluster(1)
        .addConfig(
            "conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
        .configure();

    assertEquals(
        0,
        (CollectionAdminRequest.createCollection(DEFAULT_TEST_COLLECTION_NAME, "conf1", 1, 1)
            .process(cluster.getSolrClient())
            .getStatus()));
    adminClient = getHttpSolrClient(cluster.getJettySolrRunners().get(0).getBaseUrl().toString());
    initCoreNameAndSolrCoreClient();
  }

  @After
  public void afterTest() throws Exception {
    // we use a clean cluster instance for every test, so we need to clean it up
    shutdownCluster();

    if (null != adminClient) {
      adminClient.close();
    }
    if (null != coreClient) {
      coreClient.close();
    }
  }

  @Test
  public void testCoreAdminHandler() throws Exception {
    // Use default BackupAPIImpl which hits CoreAdmin API for everything
    testSnapshotsAndBackupsDuringConcurrentCommitsAndOptimizes(new BackupAPIImpl());
  }

  @Test
  public void testReplicationHandler() throws Exception {
    // Create a custom BackupAPIImpl which uses ReplicationHandler for the backups
    // but still defaults to CoreAdmin for making named snapshots (since that's what's documented)
    testSnapshotsAndBackupsDuringConcurrentCommitsAndOptimizes(
        new BackupAPIImpl() {
          final BackupStatusChecker backupStatus = new BackupStatusChecker(coreClient);

          /** no solrj API for ReplicationHandler */
          private SolrRequest<?> makeReplicationReq(SolrParams p) {
            return new GenericSolrRequest(
                    GenericSolrRequest.METHOD.POST,
                    "/replication",
                    SolrRequest.SolrRequestType.ADMIN,
                    p)
                .setRequiresCollection(true);
          }

          /**
           * Override default backup impl to hit ReplicationHandler, and then poll that same handler
           * until success
           */
          @Override
          public void makeBackup(final String backupName, final String snapName) throws Exception {
            final TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
            ModifiableSolrParams p =
                params(
                    "command",
                    "backup",
                    "name",
                    backupName,
                    CoreAdminParams.BACKUP_LOCATION,
                    backupDir.toString());
            if (null != snapName) {
              p.add(CoreAdminParams.COMMIT_NAME, snapName);
            }
            makeReplicationReq(p).process(coreClient);
            backupStatus.waitForBackupSuccess(backupName, timeout);
          }
        });
  }

  @SuppressWarnings("AssertionFailureIgnored") // failure happens inside a thread
  public void testSnapshotsAndBackupsDuringConcurrentCommitsAndOptimizes(final BackupAPIImpl impl)
      throws Exception {
    final int numBackupIters = 20; // don't use 'atLeast', we don't want to blow up on nightly

    final AtomicReference<Throwable> heavyCommitFailure = new AtomicReference<>();
    final AtomicBoolean keepGoing = new AtomicBoolean(true);

    // this thread will do nothing but add/commit new 'dummy' docs over and over again as fast as
    // possible to create a lot of index churn w/ segment merging
    final Thread heavyCommitting =
        new Thread() {
          @Override
          public void run() {
            try {
              int docIdCounter = 0;
              while (keepGoing.get()) {
                docIdCounter++;

                final UpdateRequest req =
                    new UpdateRequest().add(makeDoc("dummy_" + docIdCounter, "dummy"));
                // always commit to force lots of new segments
                req.setParam(UpdateParams.COMMIT, "true");
                req.setParam(UpdateParams.OPEN_SEARCHER, "false"); // we don't care about searching

                // frequently forceMerge to ensure segments are frequently deleted
                if (0 == (docIdCounter % 13)) { // arbitrary
                  req.setParam(UpdateParams.OPTIMIZE, "true");
                  req.setParam(UpdateParams.MAX_OPTIMIZE_SEGMENTS, "5"); // arbitrary
                }

                log.info("Heavy Committing #{}: {}", docIdCounter, req);
                final UpdateResponse rsp = req.process(coreClient);
                assertEquals(
                    "Dummy Doc#" + docIdCounter + " add status: " + rsp, 0, rsp.getStatus());
              }
            } catch (Throwable t) {
              heavyCommitFailure.set(t);
            }
          }
        };

    heavyCommitting.start();
    try {
      // now have the "main" test thread try to take a series of backups/snapshots
      // while adding other "real" docs

      final Queue<String> namedSnapshots = new ArrayDeque<>();

      // NOTE #1: start at i=1 for 'id' & doc counting purposes...
      // NOTE #2: abort quickly if the other thread reports a heavyCommitFailure...
      for (int i = 1; (i <= numBackupIters && null == heavyCommitFailure.get()); i++) {

        // in each iteration '#i', the commit we create should have exactly 'i' documents in
        // it with the term 'type_s:real' (regardless of what the other thread does with dummy docs)

        // add & commit a doc #i
        final UpdateRequest req = new UpdateRequest().add(makeDoc("doc_" + i, "real"));
        req.setParam(UpdateParams.COMMIT, "true"); // make immediately available for backup
        req.setParam(UpdateParams.OPEN_SEARCHER, "false"); // we don't care about searching

        final UpdateResponse rsp = req.process(coreClient);
        assertEquals("Real Doc#" + i + " add status: " + rsp, 0, rsp.getStatus());

        // create a backup of the 'current' index
        impl.makeBackup("backup_currentAt_" + i);

        // verify backup is valid and has the number of 'real' docs we expect...
        validateBackup("backup_currentAt_" + i);

        // occasionally make a "snapshot_i", add it to 'namedSnapshots'
        // NOTE: we don't want to do this too often, or the SnapShotMetadataManager will protect
        // too many segment files "long term".  It's more important to stress the thread contention
        // between backups calling save/release vs the DelPolicy trying to delete segments
        if (0 == random().nextInt(7 + namedSnapshots.size())) {
          final String snapshotName = "snapshot_" + i;
          log.info("Creating snapshot: {}", snapshotName);
          impl.makeSnapshot(snapshotName);
          namedSnapshots.add(snapshotName);
        }

        // occasionally make a backup of a snapshot and remove it
        // the odds of doing this increase based on how many snapshots currently exist,
        // and how few iterations we have left
        if (3 < namedSnapshots.size()
            && random().nextInt(3 + numBackupIters - i) < random().nextInt(namedSnapshots.size())) {
          final String snapshotName = namedSnapshots.poll();
          final String backupName = "backup_as_of_" + snapshotName;
          log.info("Creating {} from {} in iter={}", backupName, snapshotName, i);
          impl.makeBackup(backupName, snapshotName);
          log.info("Deleting {} in iter={}", snapshotName, i);
          impl.deleteSnapshot(snapshotName);

          validateBackup(backupName);

          // NOTE: we can't directly compare our backups, because the stress thread
          // may have added/committed documents
          // ie: backup_as_of_snapshot_4 and backup_currentAt_4 should have the same 4 "real"
          // documents, but they may have other commits that affect the data files
          // between when the backup was taken and when the snapshot was taken

        }
      }

    } finally {
      keepGoing.set(false);
      heavyCommitting.join();
    }
    assertNull(heavyCommitFailure.get());

    {
      log.info("Done with (concurrent) updates, Deleting all docs...");
      final UpdateRequest delAll = new UpdateRequest().deleteByQuery("*:*");
      delAll.setParam(UpdateParams.COMMIT, "true");
      delAll.setParam(UpdateParams.OPTIMIZE, "true");
      delAll.setParam(UpdateParams.MAX_OPTIMIZE_SEGMENTS, "1"); // purge as many files as possible
      final UpdateResponse delRsp = delAll.process(coreClient);
      assertEquals("dellAll status: " + delRsp, 0, delRsp.getStatus());
    }

    { // Validate some backups at random...
      final int numBackupsToCheck = atLeast(1);
      log.info(
          "Validating {} random backups to ensure they are un-affected by deleting all docs...",
          numBackupsToCheck);
      try (Stream<Path> files = Files.list(backupDir)) {
        // insure consistent (arbitrary) ordering before shuffling
        final List<Path> allBackups = files.sorted().collect(Collectors.toList());
        Collections.shuffle(allBackups, random());
        for (int i = 0; i < numBackupsToCheck; i++) {
          final Path backup = allBackups.get(i);
          validateBackup(backup);
        }
      }
    }
  }

  /**
   * Given a backup name, extracts the numeric suffix identifying how many "real" docs should be in
   * it
   *
   * @see #ENDS_WITH_INT_DIGITS
   */
  private static int getNumRealDocsFromBackupName(final String backupName) {
    final Matcher m = ENDS_WITH_INT_DIGITS.matcher(backupName);
    assertTrue("Backup name does not end with int digits: " + backupName, m.find());
    return Integer.parseInt(m.group());
  }

  /**
   * Validates a backup exists, passes check index, and contains a number of "real" documents that
   * match its name
   *
   * @see #validateBackup(Path)
   */
  private void validateBackup(final String backupName) throws IOException {
    final Path backup = backupDir.resolve("snapshot." + backupName);
    validateBackup(backup);
  }

  /**
   * Validates a backup dir exists, passes check index, and contains a number of "real" documents
   * that match its name
   */
  private void validateBackup(final Path backup) throws IOException {
    log.info("Checking Validity of {}", backup);
    assertTrue(backup.toString() + ": isDir?", Files.isDirectory(backup));
    final Matcher m = ENDS_WITH_INT_DIGITS.matcher(backup.getFileName().toString());
    assertTrue("Backup dir name does not end with int digits: " + backup, m.find());
    final int numRealDocsExpected = Integer.parseInt(m.group());

    try (Directory dir = FSDirectory.open(backup)) {
      TestUtil.checkIndex(dir, CheckIndex.Level.MIN_LEVEL_FOR_SLOW_CHECKS, true, true, null);
      try (DirectoryReader r = DirectoryReader.open(dir)) {
        assertEquals(
            "num real docs in " + backup,
            numRealDocsExpected,
            r.docFreq(new Term("type_s", "real")));
      }
    }
  }

  /**
   * Creates a "large" document with lots of fields (to stimulate lots of files in each segment)
   *
   * @param id the uniqueKey
   * @param type the type of the doc for use in the 'type_s' field (for term counting later)
   */
  static SolrInputDocument makeDoc(String id, String type) {
    final SolrInputDocument doc = new SolrInputDocument("id", id, "type_s", type);
    for (int f = 0; f < 100; f++) {
      doc.addField(f + "_s", TestUtil.randomUnicodeString(random(), 20));
    }
    return doc;
  }

  private void initCoreNameAndSolrCoreClient() {
    // Sigh.
    Replica r =
        cluster
            .getSolrClient()
            .getClusterState()
            .getCollection(DEFAULT_TEST_COLLECTION_NAME)
            .getActiveSlices()
            .iterator()
            .next()
            .getReplicas()
            .iterator()
            .next();
    coreName = r.getCoreName();
    coreClient = getHttpSolrClient(r);
  }

  /**
   * API for taking backups and snapshots that can hide the impl quirks of using ReplicationHandler
   * vs CoreAdminHandler (the default)
   */
  private class BackupAPIImpl {
    /** TODO: SOLR-9239, no solrj API for CoreAdmin Backups */
    protected GenericSolrRequest makeCoreAdmin(CoreAdminAction action, SolrParams p) {
      return new GenericSolrRequest(
          GenericSolrRequest.METHOD.POST,
          "/admin/cores",
          SolrParams.wrapDefaults(params(CoreAdminParams.ACTION, action.toString()), p));
    }

    /**
     * Make a backup or the named commit snapshot (or null for latest), and only return if
     * successful
     */
    public void makeBackup(final String backupName) throws Exception {
      makeBackup(backupName, null);
    }

    /** Make a backup or latest commit, and only return if successful */
    public void makeBackup(final String backupName, final String snapName) throws Exception {
      ModifiableSolrParams p =
          params(
              CoreAdminParams.CORE,
              coreName,
              CoreAdminParams.NAME,
              backupName,
              CoreAdminParams.BACKUP_LOCATION,
              backupDir.toString(),
              CoreAdminParams.BACKUP_INCREMENTAL,
              "false");
      if (null != snapName) {
        p.add(CoreAdminParams.COMMIT_NAME, snapName);
      }
      makeCoreAdmin(CoreAdminAction.BACKUPCORE, p).process(adminClient);
      // CoreAdmin BACKUPCORE is synchronous by default, no need to wait for anything.
    }

    /** Make a named snapshot, and only return if successful */
    public void makeSnapshot(final String snapName) throws Exception {
      makeCoreAdmin(
              CoreAdminAction.CREATESNAPSHOT,
              params(
                  CoreAdminParams.CORE, coreName,
                  CoreAdminParams.COMMIT_NAME, snapName))
          .process(adminClient);
      // CoreAdmin CREATESNAPSHOT is synchronous by default, no need to wait for anything.
    }

    /** Delete a named snapshot, and only return if successful */
    public void deleteSnapshot(final String snapName) throws Exception {
      makeCoreAdmin(
              CoreAdminAction.DELETESNAPSHOT,
              params(
                  CoreAdminParams.CORE, coreName,
                  CoreAdminParams.COMMIT_NAME, snapName))
          .process(adminClient);
      // CoreAdmin DELETESNAPSHOT is synchronous by default, no need to wait for anything.
    }
  }
}
